/**
 * FileName: StructuredStreaming
 * Author:   SAMSUNG-PC 孙中军
 * Date:     2019/03/13 10:12
 * Description: 使用Sql方式读取kafka数据按照process配置进行处理最终存入kafka
 */
package cn.com.bonc.app;

import cn.com.bonc.conf.ConfigurationManager;
import cn.com.bonc.constant.Constants;
import cn.com.bonc.util.ColumnsUtil;
import cn.com.bonc.util.DataFilterAndOperatorUtil;
import cn.com.bonc.util.ProcessCfgUtil;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.metrics.sink.Sink;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import scala.Tuple2;

import java.sql.Timestamp;
import java.util.Arrays;
import java.util.List;

import static org.apache.spark.sql.functions.*;


public class StructuredStreamingKafka2Kafka {

	public static void main(String[] args) {

		SparkSession spark = SparkSession
				.builder()
				.appName("KafkaJoinRedisApp")
				.config("spark.redis.host", ConfigurationManager.getProperty(Constants.REDIS_IP))
				.config("spark.redis.port", ConfigurationManager.getProperty(Constants.REDIS_PORT))
				.getOrCreate();

		Dataset<Row> kafkaDataset = spark
				.readStream()
				.format("kafka")
				.option("kafka.bootstrap.servers", ConfigurationManager.getProperty(Constants.KAFKA_BOOTSTRAP_SERVERS))
				.option("subscribe", "stopic")
				.option("startingOffsets", ConfigurationManager.getProperty(Constants.KAFKA_AUTO_OFFSET_RESET))
				.load();

		if (ProcessCfgUtil.isJoin()){
			spark.read()
					.format("org.apache.spark.sql.redis")
					.option("table", "Xdata")
					.load()
					.registerTempTable("redis");
		}
		Dataset<Row> lineData=null;
		boolean windowAggregate = ProcessCfgUtil.isWindowAggregate();
		if(windowAggregate){
			lineData = kafkaDataset
					.select(col("value").cast("string"), col("timestamp"))
					.withColumn("tmp", split(col("value"), ColumnsUtil.getRegex()))
					.select(ColumnsUtil.getInstance().combineColumns(ColumnsUtil.getInstance().getColumns("tmp"),
							col("timestamp")))
					.drop(col("tmp"));
		}else {
			lineData=ColumnsUtil.getInstance().getMultiColumnDataset(kafkaDataset,"value");
		}

		for (String sqlStr : ProcessCfgUtil.getSqlList()) {
			lineData.createOrReplaceTempView("kafka");
			lineData = spark.sql(sqlStr);
		}
		if (windowAggregate){
			//特定窗口时长内手机号出现的次数
			lineData = lineData
					.groupBy(ProcessCfgUtil.getWindowColsWithTimestamp())
					.count();
		}

		//需要输出的列
		String sep= ConfigurationManager.getProperty(Constants.DEFAULT_DATA_SEPARATOR);
		if (ProcessCfgUtil.getSinkCols()!=null){
			lineData = lineData.select(concat_ws(sep, ProcessCfgUtil.getSinkCols()).cast("string").as("value"));
		}else {
			lineData = lineData.select(concat_ws(sep, ProcessCfgUtil.getCols(lineData.columns())).cast("string").as("value"));
		}

//		StreamingQuery query = lineData
//				.writeStream()
//				.format("kafka")
//				.outputMode(OutputMode.Update())
//				.option("checkpointLocation", ConfigurationManager.getProperty(Constants.SPARK_SINK_CHECKPOINT))
//				.option("kafka.bootstrap.servers", ConfigurationManager.getProperty(Constants.KAFKA_BOOTSTRAP_SERVERS))
//				.option("topic", ConfigurationManager.getProperty(Constants.KAFAK_SPARK_SINK_TOPICS))
//				.start();

		StreamingQuery query = lineData
				.writeStream()
				.format("console")
				.outputMode(OutputMode.Update())
				.option("truncate", "false")
				.start();

		//保持程序运行
		try {
			query.awaitTermination();
		} catch (StreamingQueryException e) {
			e.printStackTrace();
		}
	}
}